package com.process;

import canal.bean.RowData;
import com.alibaba.fastjson.JSON;
import com.base.MySqlBaseETL;
import com.bean.OrderDBEntity;
import com.utils.KafkaUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.math.BigDecimal;

/**
 * @Description: 订单实时ETL处理  (TODO)
 * @Author: lichen
 * @Times : 2021/8/15 19:21
 */
public class OrderETL {

    public static void main(String[] args) throws Exception {

        OrderETL etl = new OrderETL();

        etl.process();

    }

    public void process() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        MySqlBaseETL mysqlEtl = new MySqlBaseETL();
        //1.只过滤出来 foo_goods 表的日志，并进行转换
        DataStream<RowData> ordersCanalDS = mysqlEtl.KafkaConsummer("ods_shop", env);
        DataStream<RowData> filter = ordersCanalDS.filter(new FilterFunction<RowData>() {

            @Override
            public boolean filter(RowData canalRowData) throws Exception {
                return canalRowData.getTableName().equals("foo_orders");
            }

        }).filter(new RichFilterFunction<RowData>() {

            @Override
            public boolean filter(RowData canalRowData) throws Exception {
                return canalRowData.getEventType().equals("insert");
            }

        });
        filter.print(">");



        //2：将订单表数据转换成OrderDBEntity对象.
        DataStream<OrderDBEntity> orderDBEntityDataStream = filter.map(new MapFunction<RowData, OrderDBEntity>() {
            @Override
            public OrderDBEntity map(RowData canalRowData) throws Exception {
                OrderDBEntity order = new OrderDBEntity();
                order.setUserId(Long.valueOf(canalRowData.getColumns().get("orderId")));
                order.setOrderNo(canalRowData.getColumns().get("orderNo"));
                order.setUserId(Long.valueOf(canalRowData.getColumns().get("userId")));
                order.setOrderStatus(Integer.valueOf(canalRowData.getColumns().get("orderStatus")));
                order.setGoodsMoney(Double.valueOf(canalRowData.getColumns().get("goodsMoney")));
                order.setDeliverType(Integer.valueOf(canalRowData.getColumns().get("deliverType")));
                order.setTotalMoney(new BigDecimal(canalRowData.getColumns().get("totalMoney")));
                order.setRealTotalMoney(new BigDecimal(canalRowData.getColumns().get("realTotalMoney")));
                order.setPayType(Integer.valueOf(canalRowData.getColumns().get("payType")));
                order.setIsPay(Integer.valueOf(canalRowData.getColumns().get("isPay")));
                order.setAreaId(Integer.valueOf(canalRowData.getColumns().get("areaId")));
                order.setAreaIdPath(canalRowData.getColumns().get("areaIdPath"));
                order.setUserName(canalRowData.getColumns().get("userName"));
                order.setUserAddress(canalRowData.getColumns().get("userAddress"));
                order.setUserPhone(canalRowData.getColumns().get("userPhone"));
                order.setOrderScore(Integer.valueOf(canalRowData.getColumns().get("orderScore")));
                order.setIsInvoice(Integer.valueOf(canalRowData.getColumns().get("isInvoice")));
                order.setInvoiceClient(canalRowData.getColumns().get("invoiceClient"));
                order.setOrderRemarks(canalRowData.getColumns().get("orderRemarks"));
                order.setOrderSrc(Integer.valueOf(canalRowData.getColumns().get("orderSrc")));
                order.setNeedPay(Double.valueOf(canalRowData.getColumns().get("needPay")));
                order.setPayRand(Integer.valueOf(canalRowData.getColumns().get("payRand")));
                order.setOrderType(Integer.valueOf(canalRowData.getColumns().get("orderType")));
                order.setIsRefund(Integer.valueOf(canalRowData.getColumns().get("isRefund")));
                order.setIsAppraise(Integer.valueOf(canalRowData.getColumns().get("isAppraise")));
                order.setCancelReason(Integer.valueOf(canalRowData.getColumns().get("cancelReason")));
                order.setRejectReason(Integer.valueOf(canalRowData.getColumns().get("rejectReason")));
                order.setRejectOtherReason(canalRowData.getColumns().get("rejectOtherReason"));
                order.setIsClosed(Integer.parseInt(canalRowData.getColumns().get("isClosed")));
                order.setGoodsSearchKeys(canalRowData.getColumns().get("goodsSearchKeys"));
                order.setOrderunique(canalRowData.getColumns().get("orderunique"));
                order.setReceiveTime(canalRowData.getColumns().get("receiveTime"));
                order.setDeliveryTime(canalRowData.getColumns().get("deliveryTime"));
                order.setTradeNo(canalRowData.getColumns().get("tradeNo"));
                order.setDataFlag(Integer.valueOf(canalRowData.getColumns().get("dataFlag")));
                order.setCreateTime(canalRowData.getColumns().get("createTime"));
                order.setSettlementId(Integer.valueOf(canalRowData.getColumns().get("settlementId")));
                order.setCommissionFee(Double.valueOf(canalRowData.getColumns().get("commissionFee")));
                order.setScoreMoney(Double.valueOf(canalRowData.getColumns().get("scoreMoney")));
                order.setUseScore(Integer.valueOf(canalRowData.getColumns().get("useScore")));
                order.setOrderCode(canalRowData.getColumns().get("orderCode"));
                order.setExtraJson(canalRowData.getColumns().get("extraJson"));
                order.setOrderCodeTargetId(Integer.valueOf(canalRowData.getColumns().get("orderCodeTargetId")));
                order.setNoticeDeliver(Integer.valueOf(canalRowData.getColumns().get("noticeDeliver")));
                order.setInvoiceJson(canalRowData.getColumns().get("invoiceJson"));
                order.setLockCashMoney(Double.valueOf(canalRowData.getColumns().get("lockCashMoney")));
                order.setPayTime(canalRowData.getColumns().get("payTime"));
                order.setIsBatch(Integer.valueOf(canalRowData.getColumns().get("isBatch")));
                order.setTotalPayFee(new BigDecimal(canalRowData.getColumns().get("totalPayFee")));
                order.setIsFromCart(canalRowData.getColumns().get("isFromCart"));

                return order;
            }
        });

        //将对象转换为字符串
        DataStream<String> orderJsonDataStream = orderDBEntityDataStream.map(new MapFunction<OrderDBEntity, String>() {
            @Override
            public String map(OrderDBEntity orderDBEntity) throws Exception {
                return JSON.toJSONString(orderDBEntity);
            }
        });

        // 将数据写入到KAFKA
        orderJsonDataStream.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(KafkaUtil.kafkaCons);

                ProducerRecord<String, String> logs = new ProducerRecord<>("dwd_really_order", null, value);

                kafkaProducer.send(logs);
            }
        });

        env.execute();
    }


}
